/*
 * Copyright (C) 2016, apexes.net. All rights reserved.
 * 
 *        http://www.apexes.net
 * 
 */
package net.apexes.wsonrpc.core;

import net.apexes.wsonrpc.core.exception.JsonException;
import net.apexes.wsonrpc.core.exception.RemoteException;
import net.apexes.wsonrpc.core.exception.WsonrpcException;
import net.apexes.wsonrpc.json.JsonImplementor;
import net.apexes.wsonrpc.json.JsonRpcMessage;
import net.apexes.wsonrpc.json.JsonRpcRequest;
import net.apexes.wsonrpc.json.JsonRpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.concurrent.Future;

/**
 * 
 * @author <a href="mailto:hedyn@foxmail.com">HeDYn</a>
 *
 */
public class WsonrpcEngine {

    private static final Logger LOG = LoggerFactory.getLogger(WsonrpcEngine.class);

    private final WsonrpcConfig config;
    private final JsonRpcEngine jsonRpcEngine;
    private final WsonrpcLogger wsonrpcLogger;
    private final WsonrpcCallbackCache callbackCache;

    public WsonrpcEngine(WsonrpcConfig config) {
        this.config = config;
        this.jsonRpcEngine = new JsonRpcEngine(config.getJsonImplementor(), config.getBinaryWrapper());
        this.wsonrpcLogger = config.getWsonrpcLogger();
        this.callbackCache = new WsonrpcCallbackCache();
    }

    public final WsonrpcConfig getConfig() {
        return config;
    }

    public ServiceRegistry getServiceRegistry() {
        return jsonRpcEngine.getServiceRegistry();
    }

    public void notify(WebSocketSession session, String method, Object[] args, String trace) throws IOException {
        if (session == null) {
            throw new NullPointerException("session");
        }
        JsonRpcRequest request = jsonRpcEngine.createNotice(method, args, trace);
        if (wsonrpcLogger != null) {
            jsonRpcEngine.transmit(session, request, wsonrpcLogger, session.getId());
        } else {
            jsonRpcEngine.transmit(session, request, session.getId());
        }
    }

    public <T> Future<T> request(WebSocketSession session, String method, Object[] args, Type returnType)
            throws IOException, WsonrpcException {
        String id = config.getIdGenerator().next();
        WsonrpcFuture<T> future = new WsonrpcFuture<>(this, returnType);
        request(session, id, method, args, future);
        return future;
    }

    public void request(WebSocketSession session, String method, Object[] args, WsonrpcCallback callback)
            throws IOException, WsonrpcException {
        String id = config.getIdGenerator().next();
        request(session, id, method, args, callback);
    }

    private void request(WebSocketSession session, String id, String method, Object[] args, WsonrpcCallback callback)
            throws IOException, WsonrpcException {
        if (session == null) {
            throw new NullPointerException("session is null");
        }
        callbackCache.put(id, callback);
        try {
            JsonRpcRequest request = jsonRpcEngine.createRequest(method, args, id);
            if (wsonrpcLogger != null) {
                jsonRpcEngine.transmit(session, request, wsonrpcLogger, session.getId());
            } else {
                jsonRpcEngine.transmit(session, request, session.getId());
            }
        } catch (Throwable t) {
            callbackCache.out(id);
            if (t instanceof IOException) {
                throw (IOException) t;
            }
            throw new WsonrpcException(t);
        }
    }

    /**
     * 处理收到的JSON数据
     * 
     * @param session session
     * @param bytes 接收到的数据
     */
    public void handle(WebSocketSession session, byte[] bytes) throws IOException {
        if (session == null) {
            throw new EOFException("session is null");
        }
        try {
            JsonRpcMessage msg = parseOnReceive(session, bytes);
            handle(session, msg);
        } catch (Exception e) {
            if (wsonrpcLogger != null) {
                wsonrpcLogger.onError(session.getId(), e);
            }
        }
    }

    public void handle(WebSocketSession session, String text) throws IOException {
        if (session == null) {
            throw new EOFException("session is null");
        }
        try {
            JsonRpcMessage msg = parseOnReceive(session, text);
            handle(session, msg);
        } catch (Exception e) {
            if (wsonrpcLogger != null) {
                wsonrpcLogger.onError(session.getId(), e);
            }
        }
    }

    private JsonRpcMessage parseOnReceive(WebSocketSession session, byte[] bytes) throws Exception {
        if (wsonrpcLogger != null) {
            return jsonRpcEngine.parseOnReceive(bytes, wsonrpcLogger, session.getId());
        } else {
            return jsonRpcEngine.parseOnReceive(bytes, session.getId());
        }
    }

    private JsonRpcMessage parseOnReceive(WebSocketSession session, String text) throws Exception {
        if (wsonrpcLogger != null) {
            return jsonRpcEngine.parseOnReceive(text, wsonrpcLogger, session.getId());
        } else {
            return jsonRpcEngine.parseOnReceive(text, session.getId());
        }
    }

    private void handle(WebSocketSession session, JsonRpcMessage msg) throws IOException {
        if (msg.isRequest()) {
            handleRequest(session, (JsonRpcRequest) msg);
        } else if (msg instanceof JsonRpcResponse) {
            handleResponse((JsonRpcResponse) msg);
        }
    }

    protected void handleRequest(WebSocketSession session, JsonRpcRequest request) {
        transmit(session, execute(session, request));
    }

    protected void handleResponse(JsonRpcResponse response) {
        String id = response.getId();
        if (id == null) {
            return;
        }
        WsonrpcCallback callback = callbackCache.out(id);
        if (callback == null) {
            LOG.warn("callback is null. id={}", id);
        } else {
            try {
                callback.setValue(response);
            } catch (Throwable t) {
                callback.setError(t);
            }
        }
    }

    protected JsonRpcResponse execute(WebSocketSession session, JsonRpcRequest request) {
        return jsonRpcEngine.execute(request);
    }

    protected void transmit(WebSocketSession session, JsonRpcResponse resp) {
        if (resp != null) {
            try {
                if (wsonrpcLogger != null) {
                    jsonRpcEngine.transmit(session, resp, wsonrpcLogger, session.getId());
                } else {
                    jsonRpcEngine.transmit(session, resp, session.getId());
                }
            } catch (Exception e) {
                if (wsonrpcLogger != null) {
                    wsonrpcLogger.onError(session.getId(), e);
                }
            }
        }
    }

    protected JsonImplementor getJsonImplementor() {
        return jsonRpcEngine.getJsonImplementor();
    }

    protected JsonRpcResponse toResponse(String id, Object result) {
        return getJsonImplementor().createResponse(id, result);
    }

    protected <T> T fromResponse(JsonRpcResponse resp, Type returnType) throws JsonException, RemoteException {
        return jsonRpcEngine.fromResponse(resp, returnType);
    }
}
